实战
配置文件
a1.sources = r1
a1.channels = c1
a1.sinks = k1
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.batchSize = 3000
a1.sources.r1.batchDurationMillis = 2000
a1.sources.r1.kafka.bootstrap.servers = ip:9092,ip:9092,ip:9092
#正则读取数据
a1.sources.r1.kafka.topics.regex = ^bigdata_demo.*
a1.sources.r1.kafka.consumer.group.id = bigdata_flum3
a1.sources.r1.setTopicHeader = true
a1.sources.r1.topicHeader = topic
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = com.bigdata.bigdatautil.TimestampFlinkCDCInterceptor$Builder
#第一次消费所有数据,如果group.id没有变,那么就从最新的地方消费
a1.sources.r1.kafka.consumer.auto.offset.reset=earliest
a1.channels.c1.type = file
#保存消费的位置信息
a1.channels.c1.checkpointDir = /home/bigdata/module/apache-flume-1.9.0-bin/checkpoint
a1.channels.c1.dataDirs = /home/bigdata/module/apache-flume-1.9.0-bin/data
a1.channels.c1.maxFileSize = 2146435071
a1.channels.c1.capacity = 1123456
a1.channels.c1.keep-alive = 6
## sink1
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /origin_data/db/%{topic}_inc/%Y-%m-%d
a1.sinks.k1.hdfs.round = false
a1.sinks.k1.hdfs.rollInterval = 800
a1.sinks.k1.hdfs.rollSize = 134217728
a1.sinks.k1.hdfs.rollCount = 0
a1.sinks.k1.hdfs.fileType = CompressedStream
a1.sinks.k1.hdfs.codeC = gzip
## 拼装
a1.sources.r1.channels = c1
a1.sinks.k1.channel= c1
jvm相关
vi flume-env.sh
export JAVA_OPTS="-Xms2048m -Xmx2048m"
拦截器的使用示例
public class TimestampInterceptor implements Interceptor {
@Override
public void initialize() {
}
@Override
public Event intercept(Event event) {
Map<String, String> headers = event.getHeaders();
String log = new String(event.getBody(), StandardCharsets.UTF_8);
JSONObject jsonObject = JSONObject.parseObject(log);
Long ts = jsonObject.getLong("ts");
String database_name = jsonObject.getString("database");
String table_name = jsonObject.getString("table");
//Maxwell输出的数据中的ts字段时间戳单位为秒,Flume HDFSSink要求单位为毫秒
String timeMills = String.valueOf(ts * 1000);
headers.put("timestamp", timeMills);
headers.put("database_name",database_name);
headers.put("table_name",table_name);
return event;
}
@Override
public List<Event> intercept(List<Event> events) {
for (Event event : events) {
intercept(event);
}
return events;
}
@Override
public void close() {
}
public static class Builder implements Interceptor.Builder {
@Override
public Interceptor build() {
return new TimestampInterceptor();
}
@Override
public void configure(Context context) {
}
}
}
将打好的包放入/opt/module/flume/lib文件夹下
flume-interceptor-1.0-SNAPSHOT-jar-with-dependencies.jar
Flume启停文件
#!/bin/bash
case $1 in
"start")
echo " --------启动 abs3 flume-------"
ssh master2 "nohup /home/bigdata/module/apache-flume-1.9.0-bin/bin/flume-ng agent -n a1 -c /home/bigdata/module/apache-flume-1.9.0-bin/conf -f /home/bigdata/module/apache-flume-1.9.0-bin/conf/kafka_to_hdfs.conf >/dev/null 2>&1 &"
;;
"stop")
echo " --------启动 关闭 flume-------"
ssh master2 "ps -ef | grep kafka_to_hdfs.conf | grep -v grep |awk '{print \$2}' | xargs -n1 kill"
;;
esac